//MIT License
//Copyright (c) 2021 cloudguan rcloudguan@163.com
//Permission is hereby granted, free of charge, to any person obtaining a copy
//of this software and associated documentation files (the "Software"), to deal
//in the Software without restriction, including without limitation the rights
//to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
//copies of the Software, and to permit persons to whom the Software is
//furnished to do so, subject to the following conditions:

//The above copyright notice and this permission notice shall be included in all
//copies or substantial portions of the Software.

//THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
//IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
//FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
//AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
//LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
//OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
//SOFTWARE.

package sbox

import (
	"gitee.com/dennis-kk/rpc-go-backend/idlrpc/pkg/protocol"
	"gitee.com/dennis-kk/service-box-go/common"
	"gitee.com/dennis-kk/service-box-go/internal/net"
	"gitee.com/dennis-kk/service-box-go/util/slog"
	"github.com/panjf2000/gnet/pkg/ringbuffer"
	"sync/atomic"
)

// BoxChannel implement rpc backend go transport interface
// one connect has it's own channel
type BoxChannel struct {
	transID     uint32                   //channel uuid
	globalIndex protocol.GlobalIndexType // global index for
	status      int32                    // status RUNNING, CLOSE, RECONNECT
	localHost   string                   //local ip port cache
	peerHost    string                   //peer host cache
	conn        net.IBoxConn             // gnet connect handle
	rb          *ringbuffer.RingBuffer   // cache buffer ringbuferr impl
	cache       []byte                   //cache buffer for head
}

func (bch *BoxChannel) GlobalIndex() protocol.GlobalIndexType {
	return bch.globalIndex
}

func (bch *BoxChannel) SetGlobalIndex(uuid protocol.GlobalIndexType) {
	bch.globalIndex = uuid
}

func NewBoxChannel(conn net.IBoxConn) *BoxChannel {
	if conn == nil {
		return nil
	}
	boxchan := &BoxChannel{
		conn:      conn,
		status:    common.ChannelRunning,
		localHost: conn.LocalAddr().String(),
		peerHost:  conn.RemoteAddr().String(),
		rb:        ringbuffer.New(1024 * 16),
		cache:     make([]byte, 16),
	}
	return boxchan
}

func (bch *BoxChannel) LocalAddr() string {
	return bch.localHost
}

func (bch *BoxChannel) RemoteAddr() string {
	return bch.peerHost
}

func (bch *BoxChannel) Write(pkg []byte, length int) (int, error) {
	if bch == nil {
		return 0, common.ErrChannelInvalid
	}

	if atomic.LoadInt32(&bch.status) != common.ChannelRunning {
		return 0, common.ErrChannelNotRun
	}

	resLen, err := bch.rb.Write(pkg[:])
	if resLen != len(pkg) || err != nil {
		return 0, err
	}

	return resLen, nil
}

func (bch *BoxChannel) Read(pkg []byte, length int) (int, error) {
	if bch == nil {
		return 0, common.ErrChannelInvalid
	}

	if atomic.LoadInt32(&bch.status) != common.ChannelRunning {
		return 0, common.ErrChannelNotRun
	}

	//FIXME
	resLen, err := bch.rb.Read(pkg)
	if err != nil {
		return 0, err
	}

	return resLen, nil
}

// Peek read buffer without modify read point
func (bch *BoxChannel) Peek(length int) ([]byte, int, error) {
	if bch == nil {
		return nil, 0, common.ErrChannelInvalid
	}

	if atomic.LoadInt32(&bch.status) != common.ChannelRunning {
		return nil, 0, common.ErrChannelNotRun
	}

	//not enough buffer cache
	if bch.rb.Length() < length {
		return nil, 0, nil
	}

	head, tail := bch.rb.Peek(length)
	if len(tail) != 0 {
		total := len(head) + len(tail)
		if total > len(bch.cache) {
			bch.cache = make([]byte, total)
		}
		copy(bch.cache, head)
		copy(bch.cache[len(head):], tail)
		return bch.cache, total, nil
	}
	return head, len(head), nil
}

func (bch *BoxChannel) Send(pkg []byte) error {
	if bch == nil {
		return common.ErrChannelInvalid
	}

	if atomic.LoadInt32(&bch.status) != common.ChannelRunning {
		return common.ErrChannelNotRun
	}

	if bch.conn == nil {
		return common.ErrChannelConn
	}

	return bch.conn.Write(pkg)
}

//Close active close net channel !
func (bch *BoxChannel) Close() {
	if bch == nil {
		return
	}

	if atomic.LoadInt32(&bch.status) != common.ChannelRunning {
		return
	}

	defer func() {
		atomic.StoreInt32(&bch.status, common.ChannelClose)
	}()

	err := bch.conn.Close()
	if err != nil {
		slog.Warn("[BoxChannel] %d active close error %v !", bch.transID, err)
		return
	}
}

//Clean clean channel and reacover
func (bch *BoxChannel) Clean() {
	if bch == nil {
		return
	}
	bch.conn = nil
	bch.cache = nil
	if bch.rb != nil {
		bch.rb.Reset()
		bch.rb = nil
	}
	atomic.StoreInt32(&bch.status, common.ChannelClose)
}

func (bch *BoxChannel) Size() uint32 {
	if bch == nil {
		return 0
	}

	if atomic.LoadInt32(&bch.status) != common.ChannelRunning {
		return 0
	}

	return uint32(bch.rb.Length())
}

func (bch *BoxChannel) IsClose() bool {
	if bch == nil {
		return true
	}
	if bch.conn == nil {
		return true
	}
	return atomic.LoadInt32(&bch.status) != common.ChannelRunning
}

func (bch *BoxChannel) GetID() uint32 {
	if bch == nil {
		return 0
	}
	return bch.transID
}

func (bch *BoxChannel) SetID(transID uint32) {
	if bch == nil {
		return
	}
	bch.transID = transID
}
